use std::sync::Arc;

use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor, attestation};
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::BlockStore;
use zksync_dal::consensus_dal;

use crate::{
    config, registry,
    storage::{ConnectionPool, Store},
};

/// Task running a consensus validator for the main node.
/// Main node is currently the only leader of the consensus - i.e. it proposes all the
/// L2 blocks (generated by `Statekeeper`).
pub async fn run_main_node(
    ctx: &ctx::Ctx,
    cfg: ConsensusConfig,
    secrets: ConsensusSecrets,
    pool: ConnectionPool,
) -> anyhow::Result<()> {
    let validator_key = config::validator_key(&secrets)
        .context("validator_key")?
        .context("missing validator_key")?;

    let attester = config::attester_key(&secrets).context("attester_key")?;

    tracing::debug!(is_attester = attester.is_some(), "main node attester mode");

    let res: ctx::Result<()> = scope::run!(&ctx, |ctx, s| async {
        if let Some(spec) = &cfg.genesis_spec {
            let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;

            pool.connection(ctx)
                .await
                .wrap("connection()")?
                .adjust_global_config(ctx, &spec)
                .await
                .wrap("adjust_global_config()")?;
        }

        // The main node doesn't have a payload queue as it produces all the L2 blocks itself.
        let (store, runner) = Store::new(ctx, pool.clone(), None, None)
            .await
            .wrap("Store::new()")?;
        s.spawn_bg(async { Ok(runner.run(ctx).await.context("Store::runner()")?) });

        let global_config = pool
            .connection(ctx)
            .await
            .wrap("connection()")?
            .global_config(ctx)
            .await
            .wrap("global_config()")?
            .context("global_config() disappeared")?;
        if global_config.genesis.leader_selection
            != validator::v1::LeaderSelectionMode::Sticky(validator_key.public())
        {
            return Err(anyhow::format_err!(
                "unsupported leader selection mode - main node has to be the leader"
            )
            .into());
        }

        let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
            .await
            .wrap("BlockStore::new()")?;
        s.spawn_bg(async { Ok(runner.run(ctx).await.context("BlockStore::run()")?) });

        let attestation = Arc::new(attestation::Controller::new(attester));
        s.spawn_bg({
            let global_config = global_config.clone();
            let attestation = attestation.clone();
            async {
                let res = run_attestation_controller(ctx, &pool, global_config, attestation)
                    .await
                    .wrap("run_attestation_controller()");
                // Attestation currently is not critical for the node to function.
                // If it fails, we just log the error and continue.
                if let Err(err) = res {
                    tracing::error!("attestation controller failed: {err:#}");
                }
                Ok(())
            }
        });
        let executor = executor::Executor {
            config: config::executor(&cfg, &secrets, &global_config, None)?,
            block_store,
            validator: Some(executor::Validator {
                key: validator_key,
                replica_store: Box::new(store.clone()),
                payload_manager: Box::new(store.clone()),
            }),
            attestation,
        };

        tracing::info!("running the main node executor");
        executor.run(ctx).await.context("executor")?;
        Ok(())
    })
    .await;
    match res {
        Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
        Err(ctx::Error::Internal(err)) => Err(err),
    }
}

/// Manages attestation state by configuring the
/// next batch to attest and storing the collected
/// certificates.
async fn run_attestation_controller(
    ctx: &ctx::Ctx,
    pool: &ConnectionPool,
    cfg: consensus_dal::GlobalConfig,
    attestation: Arc<attestation::Controller>,
) -> ctx::Result<()> {
    const POLL_INTERVAL: time::Duration = time::Duration::seconds(5);
    let registry = registry::Registry::new(pool.clone()).await;
    let registry_addr = cfg.registry_address.map(registry::Address::new);
    let mut next = attester::BatchNumber(0);
    loop {
        // After regenesis it might happen that the batch number for the first block
        // is not immediately known (the first block was not produced yet),
        // therefore we need to wait for it.
        let status = loop {
            match pool
                .connection(ctx)
                .await
                .wrap("connection()")?
                .attestation_status(ctx)
                .await
                .wrap("attestation_status()")?
            {
                Some(status) if status.next_batch_to_attest >= next => break status,
                _ => {}
            }
            ctx.sleep(POLL_INTERVAL).await?;
        };
        next = status.next_batch_to_attest.next();
        tracing::info!(
            "waiting for hash of batch {:?}",
            status.next_batch_to_attest
        );
        let info = pool
            .wait_for_batch_info(ctx, status.next_batch_to_attest, POLL_INTERVAL)
            .await?;
        let hash = consensus_dal::batch_hash(&info);
        let Some(committee) = registry
            .attester_committee_for(ctx, registry_addr, status.next_batch_to_attest)
            .await
            .wrap("attester_committee_for()")?
        else {
            tracing::info!("attestation not required");
            continue;
        };
        let committee = Arc::new(committee);
        // Persist the derived committee.
        pool.connection(ctx)
            .await
            .wrap("connection")?
            .upsert_attester_committee(ctx, status.next_batch_to_attest, &committee)
            .await
            .wrap("upsert_attester_committee()")?;
        tracing::info!(
            "attesting batch {:?} with hash {hash:?}",
            status.next_batch_to_attest
        );
        attestation
            .start_attestation(Arc::new(attestation::Info {
                batch_to_attest: attester::Batch {
                    hash,
                    number: status.next_batch_to_attest,
                    genesis: status.genesis,
                },
                committee,
            }))
            .await
            .context("start_attestation()")?;
        // Main node is the only node which can update the global AttestationStatus,
        // therefore we can synchronously wait for the certificate.
        let qc = attestation
            .wait_for_cert(ctx, status.next_batch_to_attest)
            .await?
            .context("attestation config has changed unexpectedly")?;
        tracing::info!(
            "collected certificate for batch {:?}",
            status.next_batch_to_attest
        );
        pool.connection(ctx)
            .await
            .wrap("connection()")?
            .insert_batch_certificate(ctx, &qc)
            .await
            .wrap("insert_batch_certificate()")?;
    }
}
